package com.bizmda.bizsip.sample.sink.listener;

import cn.hutool.core.util.ObjectUtil;
import com.bizmda.bizsip.common.BizException;
import com.bizmda.bizsip.common.BizMessage;
import com.bizmda.bizsip.common.BizResultEnum;
import com.bizmda.bizsip.common.BizUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@Service
public class Sink9Listener {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private EventLoopGroup clientEventLoopGroup = new NioEventLoopGroup();
    private Bootstrap clientBootstrap = new Bootstrap();
    private ChannelFuture clientChannelFuture = null;

    private EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
    private EventLoopGroup workLoopGroup = new NioEventLoopGroup();
    private ServerBootstrap serverBootstrap = new ServerBootstrap();
    private ChannelFuture serverChannelFuture = null;
    private Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();

//    @RabbitListener(queues = "queue.bizsip.sink9")
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue.bizsip.sink9", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "exchange.dircect.bizsip.sink", type = ExchangeTypes.DIRECT),
            key = "key.bizsip.sink9"))

    public Object doService(Message message) {
        BizMessage outMessage;
        byte[] body;
        body = (byte[])jackson2JsonMessageConverter.fromMessage(message);
        log.debug("\nRabbitMQ收到消息:\n{}", BizUtils.buildHexLog(body));
//        String inMessage = new String(body);
        String correlationId = message.getMessageProperties().getCorrelationId();
        String repayTo = message.getMessageProperties().getReplyTo();
        Map dataMap = new HashMap(16);
        dataMap.put("correlationId",correlationId);
        dataMap.put("repayTo",repayTo);
        dataMap.put("body",body);
        byte[] dataBytes = ObjectUtil.serialize(dataMap);
        log.debug("\nTCP异步客户端发出消息:\n{}",BizUtils.buildHexLog(dataBytes));
        try {
            short length = (short)dataBytes.length;
            byte[] lengthBytes = new byte[2];
            lengthBytes[0] = (byte)(length >> 8 & 0xFF);
            lengthBytes[1] = (byte)(length & 0xFF);
            this.send(lengthBytes);
            this.send(dataBytes);
        } catch (BizException e) {
            e.printStackTrace();
            return null;
        }
        return null;
    }

    @PostConstruct
    public void init() {
        this.clientBootstrap.group(clientEventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) {
                        // 指定数据读写处理逻辑
                        channel.pipeline().addLast(new ByteArrayEncoder());
                    }
//
//                    @Override
//                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//                        log.info("Unexpected exception from downstream.", cause);
//                        ctx.close();
//                    }
                });

        this.serverBootstrap.group(bossLoopGroup, workLoopGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .option(ChannelOption.SO_KEEPALIVE, true)
                //注意服务端这里一定要用childHandler 不能用handler 否则会报错
                .childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) {
                        channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,2,0,2));
                        channel.pipeline().addLast(new ByteArrayDecoder());

                        //添加客户端和服务端之间的心跳检查状态
//                        channel.pipeline().addLast(new IdleStateHandler(6, 2, 1, TimeUnit.SECONDS));
                        channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                if (!(msg instanceof byte[])) {
                                    return;
                                }
                                log.debug("\nTCP异步服务端收到消息:\n{}",BizUtils.buildHexLog((byte[])msg));
                                Map dataMap = ObjectUtil.deserialize((byte[])msg);
                                String correlationId2 = (String)dataMap.get("correlationId");
                                String repayTo2 = (String)dataMap.get("repayTo");
                                byte[] body2 = (byte[])dataMap.get("body");
                                log.debug("\nRabbitMQ返回消息\n{}",BizUtils.buildHexLog(body2));
                                rabbitTemplate.convertAndSend("", repayTo2, body2,
                                        new MessagePostProcessor() {
                                            @Override
                                            public Message postProcessMessage(Message outMessage) {
                                                outMessage.getMessageProperties().setCorrelationId(correlationId2);
                                                outMessage.getMessageProperties().setDelay(5000);
                                                return outMessage;
                                            }
                                        });
                            }

                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                                    throws Exception {
                                if (evt instanceof IdleStateEvent) {
                                    IdleStateEvent event = (IdleStateEvent) evt;
                                    if (event.equals(IdleState.READER_IDLE)) {
                                        System.out.println("读空闲====");
                                        ctx.close();
                                    } else if (event.equals(IdleState.WRITER_IDLE)) {
                                        System.out.println("写空闲====");
                                    } else if (event.equals(IdleState.WRITER_IDLE)) {
                                        System.out.println("读写空闲====");
                                        ctx.channel().writeAndFlush("ping\r\n");
                                    }
                                }
                                super.userEventTriggered(ctx, evt);
                            }
                        });
                        channel.pipeline().addLast(new ByteArrayEncoder());

                    }
                });
        this.serverChannelFuture = this.serverBootstrap.bind(new InetSocketAddress(4445));
    }

    @PreDestroy
    public void destory() {
        try {
            this.clientChannelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("通联关闭失败", e);
            return;
        }
        try {
            this.serverChannelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            this.bossLoopGroup.shutdownGracefully();
            this.workLoopGroup.shutdownGracefully();
        }
    }


    private void send(byte[] msg) throws BizException {
        if (this.clientChannelFuture == null || !this.clientChannelFuture.channel().isActive()) {
            try {
                this.clientChannelFuture = this.clientBootstrap.connect("localhost", 4444).sync();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new BizException(BizResultEnum.CONNECTOR_TCP_CONNECT_ERROR,e);
            }
        }
        this.clientChannelFuture.channel().writeAndFlush(msg);
    }
}
